<!-- <html> -->
<head>
<title>Demonstrating Akka Streams</title>
</head>
<body>

<div>
<p>
This tutorial contains a few samples that demonstrates Akka Streams.
</p>

<p>
Akka Streams is an implementation of
<a href="http://www.reactive-streams.org/" target="_blank">Reactive Streams</a>,
which is a standard for asynchronous stream processing with non-blocking backpressure.
Akka Streams is interoperable with other Reactive Streams implementations.
</p>

<p>
Akka Streams is currently under development and these samples use a preview release, i.e.
changes can be expected. Please try it out and send feedback to the
<a href="http://groups.google.com/group/akka-user" target="_blank">Akka mailing list</a>.
</p>

<p>
Akka Streams provides a way to express and run a chain of asynchronous processing
steps acting on a sequence of elements. Every step is processed by one actor to
support parallelism. The user describes the “what” instead of the “how”, i.e. things like
batching, buffering, thread-safety are handled behind the scenes.
<p>

<p>
The processing steps are declared with a DSL, a so called <code>Flow</code>.
A <code>Flow</code> may be connected to a <code>Source</code> and/or a
<code>Sink</code>. It may also exist without either of these end points, as an
"open" flow. Any open flow when connected to a <code>Source</code> itself becomes a
<code>Source</code> and likewise when connected to a <code>Sink</code> becomes a
<code>Sink</code>. A <code>Flow</code> with both a <code>Source</code> and a
<code>Sink</code> is called a <code>RunnableFlow</code> and may be executed.
</p>
<p>
The <code>Source</code> can be constructed from a collection, an iterator,
a future, or a function which is evaluated repeatedly.
</p>

<p>
Each DSL element produces a new Flow that can be further transformed, building
up a description of the complete transformation pipeline. In order to execute
this pipeline the <code>Flow</code> must be runnable (have both <code>Source</code>
and <code>Sink</code> endpoints, and  is materialized by calling one of the execution
methods which include <code>.run</code>, <code>.runWith</code> and <code>.runForeach</code>.
</p>

<p>
Running a <code>Flow</code> involves a process called materialization, which requires
a <code>FlowMaterializer</code> configured for an actor system.
</p>

<p>
It should be noted that the streams modeled by this library are “hot”,
meaning that they asynchronously flow through a series of processors without
detailed control by the user. In particular it is not predictable how many
elements a given transformation step might buffer before handing elements
downstream, which means that transformation functions may be invoked more
often than for corresponding transformations on strict collections like
<code>List</code>. <b>An important consequence</b> is that elements that were
produced into a stream may be discarded by later processors, e.g. when using the
<code>take</code> combinator.
</p>

<p>
By default every operation is executed within its own <code>Actor</code>
to enable full pipelining of the chained set of computations. This behavior
is determined by the <code>akka.stream.ActorMaterializer</code> which is required
by those methods that materialize the Flow into a series of
<code>org.reactivestreams.Processor</code> instances that are started and active.
Synchronous compaction of steps is possible (but not yet implemented).
</p>

</div>
<div>
<h2>Basic transformation</h2>

<p>
What does a <code>Flow</code> look like?
</p>

<p>
Open <a href="#code/src/main/java/sample/stream/BasicTransformation.java" class="shortcut">BasicTransformation.java</a>
</p>

<p>
Here we use an <code>Iterator</code> over the Array produced by splitting the text
using the spaces, as input producer;
note that the iterator is an <code>Iterator&lt;String&gt;</code> and this produces
a <code>Source&lt;String&gt;</code>. The flow written to use this <code>Source</code>
must match the type, so we could not treat the source as a source of <code>Int</code>
for example.
</p>

<p>
In this sample we convert each read line to upper case and printing it
to the console. This is done in the lines <code>map(e -> e.toUpperCase)</code> and
<code>runForeach(System.out::println, materializer)</code>.
</p>

<p>
The <code>map(e -> e.toUpperCase)</code> takes Strings and produces
Strings. Behind the scenes, this constructs a <code>Transformer&lt;String, String&gt;</code>
which is itself a <code>Flow</code>
When this is attached to the <code>Source&lt;String&gt;</code>, the result is
a new <code>Flow</code> that is also a <code>Source&lt;String&gt;</code>. If the map
was over a function that converted, say, String to Int, the result would be a
<code>Source&lt;Int&gt;</code> when attaching it to this <code>Source&lt;String&gt;</code>.
</p>

<p>
The <code>runForeach(System.out::println, materializer)</code> constructs and attaches a <code>Sink</code>, in
this case an implementation called <code>Sink.runForeach</code> and again this is
specifically a <code>Sink.runForeach&lt;String&gt;</code> which matches the type of the
<code>Source&lt;String&gt;</code>. The result of attaching this matching <code>Sink</code>
to the <code>Source</code> creates a <code>RunnableFlow</code> which is then also run
bu the <code>runForeach</code> call.
</p>

<p>Unlike a <code>runForeach</code> on a collection (which returns <code>Unit</code>),
the <code>runForeach</code> on a Flow returns a <code>CompletionStage&lt;Done&gt;</code> instead. Because
we get a <code>CompletionStage</code> back, we can use it to shutdown the actor system once the
flow is completed. This is accomplished by the final line in the flow:
<pre><code>
handle((done, failure) -> {
  system.terminate();
  return NotUsed.getInstance();
});
</code></pre>

<p>
Try to run the <code>sample.stream.BasicTransformation</code> class
by selecting it in the 'Main class' menu in the <a href="#run" class="shortcut">Run</a> tab
and click the 'Run' button.
</p>

<p>
Try to add additional steps in the flow, for example skip short lines:
</p>

<pre><code>
filter(line -> line.length > 3).
</code></pre>

<p>The API is intended to be familiar to anyone used to the collections API in Scala.</p>

<p>
All stream manipulation operations can be found in the
<a href="http://doc.akka.io/api/akka/2.4.6/#akka.stream.javadsl.package" target="_blank">API documentation</a>.
</p>

</div>
<div>
<h2>Backpressure</h2>

<p>
The mandatory non-blocking backpressure is a key feature of
<a href="http://www.reactive-streams.org/" target="_blank">Reactive Streams</a>.
</p>

<p>
Open <a href="#code/src/main/java/sample/stream/WritePrimes.java" class="shortcut">WritePrimes.java</a>
</p>

<p>
In this sample we use a fast producer and several consumers, with potentially different throughput capacity.
To avoid out of memory problems it is important that the producer does not generate elements faster than
what can be consumed. Also the speed of the slowest consumer must be taken into account to avoid
unbounded buffering in intermediate steps.
</p>

<p>
Here we use a random number generator as input. The input producer is a block of code which is
evaluated repeatedly. It can generate elements very fast if needed.
</p>

<p>
We filter the numbers through two prime number checks and end up with a stream of
prime numbers, which neighbor +2 number is also a prime number. These two flow filter steps
can potentially be pipelined, i.e. executed in parallel.
</p>

<p>
Then we connect that prime number producer to two consumers. One writing to a file, and another
printing to the console. To simulate that the file writer is slow we have added an additional
sleep in a map stage right before the <code>SynchronousFileSink</code>.
</p>

<p>
The connections are made using the FlowGraph DSL: we use the <code>FlowGraph.factory().closed(...)</code>
method to construct a runnable graph (substituting 'partial' for 'closed' would create one that has open
inputs or outputs that remain to be connected). The first argument is the 'runForeach' Sink that materializes
to a <code>Future</code> that we can use to detect (abnormal) stream termination, the second argument is
a lambda expression that takes a <code>FlowGraph.Builder</code> and the imported 'slowSink' shape and
performs the actual wiring.
</p>

<p>
The first step is to create and import a broadcast node with two outputs, then we use the builder to
connect the source via the broadcast to both sinks.
</p>

<p>
Try to run the <code>sample.stream.WritePrimes</code> class
by selecting it in the 'Main class' menu in the <a href="#run" class="shortcut">Run</a> tab
and click the 'Run' button.
</p>

<p>
Note that speed of the output in the console is limited by the slow file writer, i.e.
one element per second.
</p>

<p>
Open <a href="#code/target/primes.txt" class="shortcut">primes.txt</a> to see
the file output.
</p>

</div>
<div>
<h2>Stream of streams</h2>

<p>
Let us take a look at an example of more advanced stream manipulation.
</p>

<p>
Open <a href="#code/src/main/java/sample/stream/GroupLogFile.java" class="shortcut">GroupLogFile.java</a>
</p>

<p>
We want to read a log file and pipe entries of different log level to separate files.
</p>

<p>
In this sample we exctract the level with a regular expression matching the log levels and then
write the elements of each group to a separate file.
</p>

<p>
Try to run the <code>sample.stream.GroupLogFile</code> class
by selecting it in the 'Main class' menu in the <a href="#run" class="shortcut">Run</a> tab
and click the 'Run' button.
</p>

<p>
Open the input <a href="#code/src/main/resources/logfile.txt" class="shortcut">logfile.txt</a>
and look at the resulting output log files in the
<a href="#code/target" class="shortcut">target</a> directory.
</p>

</div>
<div>
<h2>TCP Stream</h2>

<p>
Akka Streams also provides a stream based API on top of
<a href="http://doc.akka.io/docs/akka/2.4.6/java/io.html" target="_blank">Akka I/O</a>.
</p>

<p>
Open <a href="#code/src/main/java/sample/stream/TcpEcho.java" class="shortcut">TcpEcho.java</a>
</p>

<p>
When you <a href="#run" class="shortcut">Run</a> <code>TcpEcho</code> without parameters it starts
both client and server in the same JVM and the client connects to the server over port 6000.
</p>

<p>
The server is started by calling <code>bind</code> on the <code>akka.stream.javadsl.Tcp</code> extension.
It returns a <code>StreamTcp.ServerBinding</code>.
Each new client connection is represented by a new <code>IncomingTcpConnection</code>
element produced by the <code>connections</code> <code>Source&lt;StreamTcp.IncomingConnection&gt;</code> of the
<code>ServerBinding</code>. From the connection the server can operate on the <code>ByteString</code>
elements.
</p>

<p>
In this sample the server sends back the same bytes as it receives.
</p>
<pre><code>
conn.handleWith(Flow.<ByteString>empty(), materializer);
</code></pre>

<p>
You can add transformation of the bytes using a <code>Flow</code>. For example convert characters to
upper case.
</p>
<pre><code>
final Flow&lt;ByteString, ByteString, NotUsed> toUpper =
    Flow.&lt;ByteString>create().map(byteStr -> ByteString.fromString(byteStr.utf8String().toUpperCase()));
conn.handleWith(toUpper, materializer);
 </code></pre>

<p>
The connection from the client is established by calling <code>outgoingConnection</code> on the
<code>akka.stream.javadsl.Tcp</code> extension and attaching corresponding flow to input
<code>Source<code/> and output <code>Sink</code>.
</p>

<pre><code>
Future&lt;ByteString> result = responseStream.runFold(
  ByteString.empty(), (acc, in) -> acc.concat(in), materializer);
</code></pre>

<p>
In this sample the client sends a sequence of characters one-by-one to the server, aggregates the replies
into a single <code>ByteString</code>, and finally prints that.
</p>

<p>
Try to run the <code>sample.stream.TcpEcho</code> class
by selecting it in the 'Main class' menu in the <a href="#run" class="shortcut">Run</a> tab
and click the 'Run' button.
</p>

<p>
That runs the client and server in the same JVM process. It can be more
interesting to run them in separate processes. Run the following commands in separate
terminal windows.
</p>

<pre><code>
&lt;path to activator dir&gt;/activator "run-main sample.stream.TcpEcho server 0.0.0.0 6001"
</code></pre>

<pre><code>
&lt;path to activator dir&gt;/activator "run-main sample.stream.TcpEcho client 127.0.0.1 6001"
</code></pre>

<p>
You can also interact with the server with telnet:
</p>

<pre><code>
telnet 127.0.0.1 6001
</code></pre>

<p>
Type a few characters in the telnet session and press enter to see them echoed back to the terminal.
</p>

</div>
<div>
<h2>Links</h2>

<ul>
<li><a href="http://doc.akka.io/docs/akka/2.4.6/java/stream/index.html" target="_blank">Akka Streams 2.4.6 Documentation</a></li>
<li><a href="http://doc.akka.io/api/akka/2.4.6/#akka.stream.javadsl.package" target="_blank">Akka Streams 2.4.6 API documentation</a></li>
<li><a href="http://www.reactive-streams.org/" target="_blank">Reactive Streams</a></li>
</ul>

</div>


</body>
</html>
